/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.testing;



import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.io.Source;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import org.joda.time.Instant;

import org.junit.Assert;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.NoSuchElementException;

import java.util.Objects;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;

import static org.hamcrest.Matchers.containsInAnyOrder;

import static org.hamcrest.Matchers.equalTo;

import static org.junit.Assert.*;



/**

 * Helper functions and test harnesses for checking correctness of {@link Source} implementations.

 *

 * <p>Contains a few lightweight utilities (e.g. reading items from a source or a reader, such as

 * {@link #readFromSource} and {@link #readFromUnstartedReader}), as well as heavyweight property

 * testing and stress testing harnesses that help getting a large amount of test coverage with few

 * code. Most notable ones are:

 *

 * <ul>

 *   <li>{@link #assertSourcesEqualReferenceSource} helps testing that the data read by the union of

 *       sources produced by {@link BoundedSource#split} is the same as data read by the original

 *       source.

 *   <li>If your source implements dynamic work rebalancing, use the {@code assertSplitAtFraction}

 *       family of functions - they test behavior of {@link

 *       BoundedSource.BoundedReader#splitAtFraction}, in particular, that various consistency

 *       properties are respected and the total set of data read by the source is preserved when

 *       splits happen. Use {@link #assertSplitAtFractionBehavior} to test individual cases of

 *       {@code splitAtFraction} and use {@link #assertSplitAtFractionExhaustive} as a heavy-weight

 *       stress test including concurrency. We strongly recommend to use both.

 * </ul>

 *

 * For example usages, see the unit tests of classes such as {@code AvroSource} or {@code

 * TextSource}.

 *

 * <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the classpath.

 */

public class SourceTestUtils {

  private static final Logger LOG = LoggerFactory.getLogger(SourceTestUtils.class);



  // A wrapper around a value of type T that compares according to the structural

  // value provided by a Coder<T>, but prints both the original and structural value,

  // to help get good error messages from JUnit equality assertion failures and such.

  private static class ReadableStructuralValue<T> {

    private T originalValue;

    private Object structuralValue;



    public ReadableStructuralValue(T originalValue, Object structuralValue) {

      this.originalValue = originalValue;

      this.structuralValue = structuralValue;

    }



    @Override

    public int hashCode() {

      return Objects.hashCode(structuralValue);

    }



    @Override

    public boolean equals(Object obj) {

      if (obj == null || !(obj instanceof ReadableStructuralValue)) {

        return false;

      }

      return Objects.equals(structuralValue, ((ReadableStructuralValue) obj).structuralValue);

    }



    @Override

    public String toString() {

      return String.format("[%s (structural %s)]", originalValue, structuralValue);

    }

  }



  /**

   * Testing utilities below depend on standard assertions and matchers to compare elements read by

   * sources. In general the elements may not implement {@code equals}/{@code hashCode} properly,

   * however every source has a {@link Coder} and every {@code Coder} can produce a {@link

   * Coder#structuralValue} whose {@code equals}/{@code hashCode} is consistent with equality of

   * encoded format. So we use this {@link Coder#structuralValue} to compare elements read by

   * sources.

   */

  public static <T> List<ReadableStructuralValue<T>> createStructuralValues(

	  Coder<T> coder, List<T> list) throws Exception {

    List<ReadableStructuralValue<T>> result = new ArrayList<>();

    for (T elem : list) {

      result.add(new ReadableStructuralValue<>(elem, coder.structuralValue(elem)));

    }

    return result;

  }



  /** Reads all elements from the given {@link BoundedSource}. */

  public static <T> List<T> readFromSource(BoundedSource<T> source, PipelineOptions options)

      throws IOException {

    try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {

      return readFromUnstartedReader(reader);

    }

  }



  public static <T> List<T> readFromSplitsOfSource(

	  BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options)

      throws Exception {

    List<T> res = Lists.newArrayList();

    for (BoundedSource<T> split : source.split(desiredBundleSizeBytes, options)) {

      res.addAll(readFromSource(split, options));

    }

    return res;

  }



  /** Reads all elements from the given unstarted {@link Source.Reader}. */

  public static <T> List<T> readFromUnstartedReader(Source.Reader<T> reader) throws IOException {

    return readRemainingFromReader(reader, false);

  }



  /** Reads all elements from the given started {@link Source.Reader}. */

  public static <T> List<T> readFromStartedReader(Source.Reader<T> reader) throws IOException {

    return readRemainingFromReader(reader, true);

  }



  /** Read elements from a {@link Source.Reader} until n elements are read. */

  public static <T> List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n)

      throws IOException {

    return readNItemsFromReader(reader, n, false);

  }



  /**

   * Read elements from a {@link Source.Reader} that has already had {@link Source.Reader#start}

   * called on it, until n elements are read.

   */

  public static <T> List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n)

      throws IOException {

    return readNItemsFromReader(reader, n, true);

  }



  /**

   * Read elements from a {@link Source.Reader} until n elements are read.

   *

   * <p>There must be at least n elements remaining in the reader, except for the case when n is

   * {@code Integer.MAX_VALUE}, which means "read all remaining elements".

   */

  private static <T> List<T> readNItemsFromReader(Source.Reader<T> reader, int n, boolean started)

      throws IOException {

    List<T> res = new ArrayList<>();

    for (int i = 0; i < n; i++) {

      boolean shouldStart = (i == 0 && !started);

      boolean more = shouldStart ? reader.start() : reader.advance();

      if (n != Integer.MAX_VALUE) {

        assertTrue(more);

      }

      if (!more) {

        break;

      }

      res.add(reader.getCurrent());

    }

    return res;

  }



  /** Read all remaining elements from a {@link Source.Reader}. */

  public static <T> List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started)

      throws IOException {

    return readNItemsFromReader(reader, Integer.MAX_VALUE, started);

  }



  /**

   * Given a reference {@code Source} and a list of {@code Source}s, assert that the union of the

   * records read from the list of sources is equal to the records read from the reference source.

   */

  public static <T> void assertSourcesEqualReferenceSource(

      BoundedSource<T> referenceSource,

      List<? extends BoundedSource<T>> sources,

      PipelineOptions options)

      throws Exception {

    Coder<T> coder = referenceSource.getOutputCoder();

    List<T> referenceRecords = readFromSource(referenceSource, options);

    List<T> bundleRecords = new ArrayList<>();

    for (BoundedSource<T> source : sources) {

      assertThat(

          "Coder type for source "

              + source

              + " is not compatible with Coder type for referenceSource "

              + referenceSource,

          source.getOutputCoder(),

          equalTo(coder));

      List<T> elems = readFromSource(source, options);

      bundleRecords.addAll(elems);

    }

    List<ReadableStructuralValue<T>> bundleValues = createStructuralValues(coder, bundleRecords);

    List<ReadableStructuralValue<T>> referenceValues =

        createStructuralValues(coder, referenceRecords);

    assertThat(bundleValues, containsInAnyOrder(referenceValues.toArray()));

  }



  /**

   * Assert that a {@code Reader} returns a {@code Source} that, when read from, produces the same

   * records as the reader.

   */

  public static <T> void assertUnstartedReaderReadsSameAsItsSource(

	  BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {

    Coder<T> coder = reader.getCurrentSource().getOutputCoder();

    List<T> expected = readFromUnstartedReader(reader);

    List<T> actual = readFromSource(reader.getCurrentSource(), options);

    List<ReadableStructuralValue<T>> expectedStructural = createStructuralValues(coder, expected);

    List<ReadableStructuralValue<T>> actualStructural = createStructuralValues(coder, actual);

    assertThat(actualStructural, containsInAnyOrder(expectedStructural.toArray()));

  }



  /**

   * Expected outcome of {@link BoundedSource.BoundedReader#splitAtFraction}.

   */

  public enum ExpectedSplitOutcome {

    /** The operation must succeed and the results must be consistent. */

    MUST_SUCCEED_AND_BE_CONSISTENT,

    /** The operation must fail (return {@code null}). */

    MUST_FAIL,

    /** The operation must either fail, or succeed and the results be consistent. */

    MUST_BE_CONSISTENT_IF_SUCCEEDS

  }



  /**

   * Contains two values: the number of items in the primary source, and the number of items in the

   * residual source, -1 if split failed.

   */

  private static class SplitAtFractionResult {

    public int numPrimaryItems;

    public int numResidualItems;



    public SplitAtFractionResult(int numPrimaryItems, int numResidualItems) {

      this.numPrimaryItems = numPrimaryItems;

      this.numResidualItems = numResidualItems;

    }

  }



  /**

   * Asserts that the {@code source}'s reader either fails to {@code splitAtFraction(fraction)}

   * after reading {@code numItemsToReadBeforeSplit} items, or succeeds in a way that is consistent

   * according to {@link #assertSplitAtFractionSucceedsAndConsistent}.

   *

   * <p>Returns SplitAtFractionResult.

   */

  public static <T> SplitAtFractionResult assertSplitAtFractionBehavior(

      BoundedSource<T> source,

      int numItemsToReadBeforeSplit,

      double splitFraction,

      ExpectedSplitOutcome expectedOutcome,

      PipelineOptions options)

      throws Exception {

    return assertSplitAtFractionBehaviorImpl(

        source,

        readFromSource(source, options),

        numItemsToReadBeforeSplit,

        splitFraction,

        expectedOutcome,

        options);

  }



  /**

   * Compares two lists elementwise and throws a detailed assertion failure optimized for human

   * reading in case they are unequal.

   */

  private static <T> void assertListsEqualInOrder(

      String message, String expectedLabel, List<T> expected, String actualLabel, List<T> actual) {

    int i = 0;

    for (; i < expected.size() && i < actual.size(); ++i) {

      if (!Objects.equals(expected.get(i), actual.get(i))) {

        Assert.fail(

            String.format(

                "%s: %s and %s have %d items in common and then differ. "

                    + "Item in %s (%d more): %s, item in %s (%d more): %s",

                message,

                expectedLabel,

                actualLabel,

                i,

                expectedLabel,

                expected.size() - i - 1,

                expected.get(i),

                actualLabel,

                actual.size() - i - 1,

                actual.get(i)));

      }

    }

    if (i < expected.size() /* but i == actual.size() */) {

      Assert.fail(

          String.format(

              "%s: %s has %d more items after matching all %d from %s. First 5: %s",

              message,

              expectedLabel,

              expected.size() - actual.size(),

              actual.size(),

              actualLabel,

              expected.subList(actual.size(), Math.min(expected.size(), actual.size() + 5))));

    } else if (i < actual.size() /* but i == expected.size() */) {

      Assert.fail(

          String.format(

              "%s: %s has %d more items after matching all %d from %s. First 5: %s",

              message,

              actualLabel,

              actual.size() - expected.size(),

              expected.size(),

              expectedLabel,

              actual.subList(expected.size(), Math.min(actual.size(), expected.size() + 5))));

    } else {

      // All is well.

    }

  }



  private static <T> SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehaviorImpl(

      BoundedSource<T> source,

      List<T> expectedItems,

      int numItemsToReadBeforeSplit,

      double splitFraction,

      ExpectedSplitOutcome expectedOutcome,

      PipelineOptions options)

      throws Exception {

    try (BoundedSource.BoundedReader<T> reader = source.createReader(options)) {

      BoundedSource<T> originalSource = reader.getCurrentSource();

      List<T> currentItems = readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplit);

      BoundedSource<T> residual = reader.splitAtFraction(splitFraction);

      if (residual != null) {

        assertFalse(

            String.format(

                "Primary source didn't change after a successful split of %s at %f "

                    + "after reading %d items. "

                    + "Was the source object mutated instead of creating a new one? "

                    + "Source objects MUST be immutable.",

                source, splitFraction, numItemsToReadBeforeSplit),

            reader.getCurrentSource() == originalSource);

        assertFalse(

            String.format(

                "Residual source equal to original source after a successful split of %s at %f "

                    + "after reading %d items. "

                    + "Was the source object mutated instead of creating a new one? "

                    + "Source objects MUST be immutable.",

                source, splitFraction, numItemsToReadBeforeSplit),

            reader.getCurrentSource() == residual);

      }

      // Failure cases are: must succeed but fails; must fail but succeeds.

      switch (expectedOutcome) {

        case MUST_SUCCEED_AND_BE_CONSISTENT:

          assertNotNull(

              "Failed to split reader of source: "

                  + source

                  + " at "

                  + splitFraction

                  + " after reading "

                  + numItemsToReadBeforeSplit

                  + " items",

              residual);

          break;

        case MUST_FAIL:

          assertEquals(null, residual);

          break;

        case MUST_BE_CONSISTENT_IF_SUCCEEDS:

          // Nothing.

          break;

      }

      currentItems.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplit > 0));

      BoundedSource<T> primary = reader.getCurrentSource();

      return verifySingleSplitAtFractionResult(

          source,

          expectedItems,

          currentItems,

          primary,

          residual,

          numItemsToReadBeforeSplit,

          splitFraction,

          options);

    }

  }



  private static <T> SourceTestUtils.SplitAtFractionResult verifySingleSplitAtFractionResult(

      BoundedSource<T> source,

      List<T> expectedItems,

      List<T> currentItems,

      BoundedSource<T> primary,

      BoundedSource<T> residual,

      int numItemsToReadBeforeSplit,

      double splitFraction,

      PipelineOptions options)

      throws Exception {

    List<T> primaryItems = readFromSource(primary, options);

    if (residual != null) {

      List<T> residualItems = readFromSource(residual, options);

      List<T> totalItems = new ArrayList<>();

      totalItems.addAll(primaryItems);

      totalItems.addAll(residualItems);

      String errorMsgForPrimarySourceComp =

          String.format(

              "Continued reading after split yielded different items than primary source: "

                  + "split at %s after reading %s items, original source: %s, primary source: %s",

              splitFraction, numItemsToReadBeforeSplit, source, primary);

      String errorMsgForTotalSourceComp =

          String.format(

              "Items in primary and residual sources after split do not add up to items "

                  + "in the original source. Split at %s after reading %s items; "

                  + "original source: %s, primary: %s, residual: %s",

              splitFraction, numItemsToReadBeforeSplit, source, primary, residual);

      Coder<T> coder = primary.getOutputCoder();

      List<ReadableStructuralValue<T>> primaryValues = createStructuralValues(coder, primaryItems);

      List<ReadableStructuralValue<T>> currentValues = createStructuralValues(coder, currentItems);

      List<ReadableStructuralValue<T>> expectedValues =

          createStructuralValues(coder, expectedItems);

      List<ReadableStructuralValue<T>> totalValues = createStructuralValues(coder, totalItems);

      assertListsEqualInOrder(

          errorMsgForPrimarySourceComp, "current", currentValues, "primary", primaryValues);

      assertListsEqualInOrder(

          errorMsgForTotalSourceComp, "total", expectedValues, "primary+residual", totalValues);

      return new SplitAtFractionResult(primaryItems.size(), residualItems.size());

    }

    return new SplitAtFractionResult(primaryItems.size(), -1);

  }



  /**

   * Verifies some consistency properties of {@link BoundedSource.BoundedReader#splitAtFraction} on

   * the given source. Equivalent to the following pseudocode:

   *

   * <pre>

   *   Reader reader = source.createReader();

   *   read N items from reader;

   *   Source residual = reader.splitAtFraction(splitFraction);

   *   Source primary = reader.getCurrentSource();

   *   assert: items in primary == items we read so far

   *                               + items we'll get by continuing to read from reader;

   *   assert: items in original source == items in primary + items in residual

   * </pre>

   */

  public static <T> void assertSplitAtFractionSucceedsAndConsistent(

      BoundedSource<T> source,

      int numItemsToReadBeforeSplit,

      double splitFraction,

      PipelineOptions options)

      throws Exception {

    assertSplitAtFractionBehavior(

        source,

        numItemsToReadBeforeSplit,

        splitFraction,

        ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT,

        options);

  }



  /**

   * Asserts that the {@code source}'s reader fails to {@code splitAtFraction(fraction)} after

   * reading {@code numItemsToReadBeforeSplit} items.

   */

  public static <T> void assertSplitAtFractionFails(

      BoundedSource<T> source,

      int numItemsToReadBeforeSplit,

      double splitFraction,

      PipelineOptions options)

      throws Exception {

    assertSplitAtFractionBehavior(

        source, numItemsToReadBeforeSplit, splitFraction, ExpectedSplitOutcome.MUST_FAIL, options);

  }



  private static class SplitFractionStatistics {

    List<Double> successfulFractions = new ArrayList<>();

    List<Double> nonTrivialFractions = new ArrayList<>();

  }



  /**

   * Asserts that given a start position, {@link BoundedSource.BoundedReader#splitAtFraction} at

   * every interesting fraction (halfway between two fractions that differ by at least one item) can

   * be called successfully and the results are consistent if a split succeeds.

   */

  private static <T> void assertSplitAtFractionBinary(

      BoundedSource<T> source,

      List<T> expectedItems,

      int numItemsToBeReadBeforeSplit,

      double leftFraction,

      SplitAtFractionResult leftResult,

      double rightFraction,

      SplitAtFractionResult rightResult,

      PipelineOptions options,

      SplitFractionStatistics stats)

      throws Exception {

    if (rightFraction - leftFraction < 0.001) {

      // Do not recurse too deeply. Otherwise we will end up in infinite

      // recursion, e.g., while trying to find the exact minimal fraction s.t.

      // split succeeds. A precision of 0.001 when looking for such a fraction

      // ought to be enough for everybody.

      return;

    }

    double middleFraction = (rightFraction + leftFraction) / 2;

    if (leftResult == null) {

      leftResult =

          assertSplitAtFractionBehaviorImpl(

              source,

              expectedItems,

              numItemsToBeReadBeforeSplit,

              leftFraction,

              ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS,

              options);

    }

    if (rightResult == null) {

      rightResult =

          assertSplitAtFractionBehaviorImpl(

              source,

              expectedItems,

              numItemsToBeReadBeforeSplit,

              rightFraction,

              ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS,

              options);

    }

    SplitAtFractionResult middleResult =

        assertSplitAtFractionBehaviorImpl(

            source,

            expectedItems,

            numItemsToBeReadBeforeSplit,

            middleFraction,

            ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS,

            options);

    if (middleResult.numResidualItems != -1) {

      stats.successfulFractions.add(middleFraction);

    }

    if (middleResult.numResidualItems > 0) {

      stats.nonTrivialFractions.add(middleFraction);

    }

    // Two split fractions are equivalent if they yield the same number of

    // items in primary vs. residual source. Left and right are already not

    // equivalent. Recurse into [left, middle) and [right, middle) respectively

    // if middle is not equivalent to left or right.

    if (leftResult.numPrimaryItems != middleResult.numPrimaryItems) {

      assertSplitAtFractionBinary(

          source,

          expectedItems,

          numItemsToBeReadBeforeSplit,

          leftFraction,

          leftResult,

          middleFraction,

          middleResult,

          options,

          stats);

    }

    if (rightResult.numPrimaryItems != middleResult.numPrimaryItems) {

      assertSplitAtFractionBinary(

          source,

          expectedItems,

          numItemsToBeReadBeforeSplit,

          middleFraction,

          middleResult,

          rightFraction,

          rightResult,

          options,

          stats);

    }

  }



  private static final int MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM = 100;

  private static final int MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL = 1000;



  /**

   * Asserts that for each possible start position, {@link

   * BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway between two

   * fractions that differ by at least one item) can be called successfully and the results are

   * consistent if a split succeeds. Verifies multithreaded splitting as well.

   */

  public static <T> void assertSplitAtFractionExhaustive(

	  BoundedSource<T> source, PipelineOptions options) throws Exception {

    List<T> expectedItems = readFromSource(source, options);

    assertFalse("Empty source", expectedItems.isEmpty());

    assertFalse("Source reads a single item", expectedItems.size() == 1);

    List<List<Double>> allNonTrivialFractions = new ArrayList<>();

    {

      boolean anySuccessfulFractions = false;

      boolean anyNonTrivialFractions = false;

      for (int i = 0; i < expectedItems.size(); i++) {

        SplitFractionStatistics stats = new SplitFractionStatistics();

        assertSplitAtFractionBinary(source, expectedItems, i, 0.0, null, 1.0, null, options, stats);

        if (!stats.successfulFractions.isEmpty()) {

          anySuccessfulFractions = true;

        }

        if (!stats.nonTrivialFractions.isEmpty()) {

          anyNonTrivialFractions = true;

        }

        allNonTrivialFractions.add(stats.nonTrivialFractions);

      }

      assertTrue(

          "splitAtFraction test completed vacuously: no successful split fractions found",

          anySuccessfulFractions);

      assertTrue(

          "splitAtFraction test completed vacuously: no non-trivial split fractions found",

          anyNonTrivialFractions);

    }

    {

      // Perform a stress test of "racy" concurrent splitting:

      // for every position (number of items read), try to split at the minimum nontrivial

      // split fraction for that position concurrently with reading the record at that position.

      // To ensure that the test is non-vacuous, make sure that the splitting succeeds

      // at least once and fails at least once.

      ExecutorService executor = Executors.newFixedThreadPool(2);

      int numTotalTrials = 0;

      for (int i = 0; i < expectedItems.size(); i++) {

        double minNonTrivialFraction = 2.0; // Greater than any possible fraction.

        for (double fraction : allNonTrivialFractions.get(i)) {

          minNonTrivialFraction = Math.min(minNonTrivialFraction, fraction);

        }

        if (minNonTrivialFraction == 2.0) {

          // This will not happen all the time because otherwise the test above would

          // detect vacuousness.

          continue;

        }

        int numTrials = 0;

        boolean haveSuccess = false, haveFailure = false;

        while (true) {

          ++numTrials;

          if (numTrials > MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM) {

            LOG.warn(

                "After {} concurrent splitting trials at item #{}, observed only {}, "

                    + "giving up on this item",

                numTrials,

                i,

                haveSuccess ? "success" : "failure");

            break;

          }

          if (assertSplitAtFractionConcurrent(

              executor, source, expectedItems, i, minNonTrivialFraction, options)) {

            haveSuccess = true;

          } else {

            haveFailure = true;

          }

          if (haveSuccess && haveFailure) {

            LOG.info(

                "{} trials to observe both success and failure of concurrent splitting at item #{}",

                numTrials,

                i);

            break;

          }

        }

        numTotalTrials += numTrials;

        if (numTotalTrials > MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL) {

          LOG.warn(

              "After {} total concurrent splitting trials, considered only {} items, giving up.",

              numTotalTrials,

              i);

          break;

        }

      }

      LOG.info(

          "{} total concurrent splitting trials for {} items",

          numTotalTrials,

          expectedItems.size());

    }

  }



  private static <T> boolean assertSplitAtFractionConcurrent(

      ExecutorService executor,

      BoundedSource<T> source,

      List<T> expectedItems,

      final int numItemsToReadBeforeSplitting,

      final double fraction,

      PipelineOptions options)

      throws Exception {

    @SuppressWarnings("resource") // Closed in readerThread

    final BoundedSource.BoundedReader<T> reader = source.createReader(options);

    final CountDownLatch unblockSplitter = new CountDownLatch(1);

    Future<List<T>> readerThread =

        executor.submit(

            () -> {

              try {

                List<T> items =

                    readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplitting);

                unblockSplitter.countDown();

                items.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplitting > 0));

                return items;

              } finally {

                reader.close();

              }

            });

    Future<KV<BoundedSource<T>, BoundedSource<T>>> splitterThread =

        executor.submit(

            () -> {

              unblockSplitter.await();

              BoundedSource<T> residual = reader.splitAtFraction(fraction);

              if (residual == null) {

                return null;

              }

              return KV.of(reader.getCurrentSource(), residual);

            });

    List<T> currentItems = readerThread.get();

    KV<BoundedSource<T>, BoundedSource<T>> splitSources = splitterThread.get();

    if (splitSources == null) {

      return false;

    }

    SplitAtFractionResult res =

        verifySingleSplitAtFractionResult(

            source,

            expectedItems,

            currentItems,

            splitSources.getKey(),

            splitSources.getValue(),

            numItemsToReadBeforeSplitting,

            fraction,

            options);

    return (res.numResidualItems > 0);

  }



  /**

   * Returns an equivalent unsplittable {@code BoundedSource<T>}.

   *

   * <p>It forwards most methods to the given {@code boundedSource}, except:

   *

   * <ol>

   *   <li>{@link BoundedSource#split} rejects initial splitting by returning itself in a list.

   *   <li>{@link BoundedSource.BoundedReader#splitAtFraction} rejects dynamic splitting by returning null.

   * </ol>

   */

  public static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource) {

    return new UnsplittableSource<>(boundedSource);

  }



  private static class UnsplittableSource<T> extends BoundedSource<T> {



    private final BoundedSource<T> boundedSource;



    private UnsplittableSource(BoundedSource<T> boundedSource) {

      this.boundedSource = checkNotNull(boundedSource, "boundedSource");

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      this.boundedSource.populateDisplayData(builder);

    }



    @Override

    public List<? extends BoundedSource<T>> split(

        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {

      return ImmutableList.of(this);

    }



    @Override

    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {

      return boundedSource.getEstimatedSizeBytes(options);

    }



    @Override

    public BoundedReader<T> createReader(PipelineOptions options) throws IOException {

      return new UnsplittableReader<>(boundedSource, boundedSource.createReader(options));

    }



    @Override

    public void validate() {

      boundedSource.validate();

    }



    @Override

    public Coder<T> getOutputCoder() {

      return boundedSource.getOutputCoder();

    }



    private static class UnsplittableReader<T> extends BoundedReader<T> {



      private final BoundedSource<T> boundedSource;

      private final BoundedReader<T> boundedReader;



      private UnsplittableReader(BoundedSource<T> boundedSource, BoundedReader<T> boundedReader) {

        this.boundedSource = checkNotNull(boundedSource, "boundedSource");

        this.boundedReader = checkNotNull(boundedReader, "boundedReader");

      }



      @Override

      public BoundedSource<T> getCurrentSource() {

        return boundedSource;

      }



      @Override

      public boolean start() throws IOException {

        return boundedReader.start();

      }



      @Override

      public boolean advance() throws IOException {

        return boundedReader.advance();

      }



      @Override

      public T getCurrent() throws NoSuchElementException {

        return boundedReader.getCurrent();

      }



      @Override

      public void close() throws IOException {

        boundedReader.close();

      }



      @Override

      @Nullable

      public BoundedSource<T> splitAtFraction(double fraction) {

        return null;

      }



      @Override

      @Nullable

      public Double getFractionConsumed() {

        return boundedReader.getFractionConsumed();

      }



      @Override

      public long getSplitPointsConsumed() {

        return boundedReader.getSplitPointsConsumed();

      }



      @Override

      public long getSplitPointsRemaining() {

        return boundedReader.getSplitPointsRemaining();

      }



      @Override

      public Instant getCurrentTimestamp() throws NoSuchElementException {

        return boundedReader.getCurrentTimestamp();

      }

    }

  }

}